package cn.doitedu.data_sync;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Job01_UserInfo_To_Hbase {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        // 建表，映射业务库mysql中的用户注册信息表
        tenv.executeSql(
                " create table  ums_member_mysql(            "+
                        " id bigint,                         "+
                        " member_level_id bigint,            "+
                        " username string,                   "+
                        " phone string,                      "+
                        " status int,                        "+
                        " create_time timestamp(3) ,         "+
                        " gender int,                        "+
                        " birthday date ,                    "+
                        " city string,                       "+
                        " job string,                        "+
                        " source_type int,                   "+
                        " integration int,                   "+
                        " growth int,                        "+
                        " luckey_count int,                  "+
                        " history_integration int,           "+
                        " modify_time timestamp(3),          "+
                        " primary key (username) not enforced "+
                        " ) with (                          "+
                        "     'connector' = 'mysql-cdc',    "+
                        "     'hostname' = 'doitedu',       "+
                        "     'port' = '3306',              "+
                        "     'username' = 'root',          "+
                        "     'password' = 'root',          "+
                        "     'database-name' = 'realtimedw',   "+
                        "     'table-name' = 'ums_member'   "+
                        " )                                 "
        );

        // 建表，映射hbase中的用户注册信息维表
        tenv.executeSql(
                " CREATE TABLE ums_member_hbase (        "+
                        "  username STRING,                      "+
                        "  f ROW<                                "+
                        "   id bigint,                           "+
                        " 	member_level_id bigint,              "+
                        " 	phone string,                        "+
                        " 	status int,                          "+
                        "   create_time timestamp(3),            "+
                        " 	gender int,                          "+
                        " 	birthday date,                       "+
                        " 	city string,                         "+
                        " 	job string,                          "+
                        " 	source_type int,                     "+
                        " 	integration int,                     "+
                        " 	growth int,                          "+
                        " 	luckey_count int,                    "+
                        " 	history_integration int,             "+
                        " 	modify_time timestamp(3)>,           "+
                        "   primary key(username) not enforced	 "+
                        " ) WITH (                               "+
                        "  'connector' = 'hbase-2.2',            "+
                        "  'table-name' = 'dim_ums_member',      "+
                        "  'zookeeper.quorum' = 'doitedu:2181'   "+
                        " )                                      "
        );


        // 插入sql
        tenv.executeSql(
                        " INSERT INTO ums_member_hbase                                     "+
                        " SELECT                                                           "+
                        "    username,                                                     "+
                        "    row(id,member_level_id,phone,status,create_time,              "+
                        "       gender,birthday,city,job,source_type,integration,          "+
                        " 	  growth,luckey_count,history_integration,modify_time) as f    "+
                        " FROM ums_member_mysql                                            "
        );


    }
}
